Skip to content

Streaming & Cancellation

Understanding interruption, priority channels, and graceful shutdown

HPD-Agent provides two distinct mechanisms for stopping agent execution: CancellationToken (hard stop) and Interruption (graceful stop). Understanding when to use each is critical for building responsive UIs.

CancellationToken vs Interruption

There are TWO ways to stop an agent - here's when to use each:

MethodUse WhenWhat HappensUser Experience
CancellationTokenUser hits Ctrl+C, emergency stop, timeoutThrows OperationCanceledException, entire loop stops immediatelyNuclear option - everything stops now
InterruptionUser clicks "Stop" button in UIDrops CanInterrupt=true events (text deltas), delivers completion eventsGraceful stop - gets MessageTurnFinishedEvent

When to Use Which

  • Ctrl+C in console: CancellationToken (user wants immediate exit)
  • Stop button in web UI: Interruption (graceful, clean UI state)
  • Timeout: CancellationToken (hard deadline)
  • User changes mind mid-stream: Interruption (get completion events)

CancellationToken (Hard Stop)

Console Application Pattern

csharp
using var cts = new CancellationTokenSource();

// Hook Ctrl+C
Console.CancelKeyPress += (_, e) =>
{
    e.Cancel = true;  // Prevent immediate process exit
    cts.Cancel();     // Trigger cancellation
};

try
{
    await foreach (var evt in agent.RunAsync(messages, cancellationToken: cts.Token))
    {
        if (evt is IObservabilityEvent) continue;

        switch (evt)
        {
            case TextDeltaEvent delta:
                Console.Write(delta.Text);
                break;

            case MessageTurnFinishedEvent:
                Console.WriteLine("\n✓ Done");
                break;
        }
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine("\n⚠ Cancelled by user");
}

Web Application Pattern

csharp
// In your ASP.NET endpoint
app.MapPost("/agent/stream", async (
    HttpContext context,
    MessageRequest request) =>
{
    // Use HTTP request cancellation token
    var ct = context.RequestAborted;

    try
    {
        await foreach (var evt in agent.RunAsync(request.Messages, cancellationToken: ct))
        {
            // If client disconnects, ct triggers and stream stops
            var json = AgentEventSerializer.ToJson(evt);
            await writer.WriteAsync($"data: {json}\n\n");
            await writer.FlushAsync(ct);
        }
    }
    catch (OperationCanceledException)
    {
        // Client disconnected - clean up silently
    }
});

Timeout Pattern

csharp
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));

try
{
    await foreach (var evt in agent.RunAsync(messages, cancellationToken: cts.Token))
    {
        if (evt is IObservabilityEvent) continue;
        await HandleEventAsync(evt);
    }
}
catch (OperationCanceledException) when (cts.Token.IsCancellationRequested)
{
    Console.WriteLine("\n⚠ Operation timed out after 5 minutes");
}

Interruption (Graceful Stop)

Note: Stream interruption is an advanced feature. Most applications should use CancellationToken for simplicity. Use interruption when you need graceful completion events in your UI.

Understanding Interruption

Interruption drops interruptible events (like text deltas) while still delivering completion events:

csharp
// Events with CanInterrupt = true (dropped on interruption)
- TextDeltaEvent
- ReasoningDeltaEvent
- ToolCallArgsEvent
- Most observability events

// Events with CanInterrupt = false (always delivered)
- MessageTurnFinishedEvent
- TextMessageEndEvent
- ToolCallEndEvent
- Error events

Basic Interruption Example

csharp
// Access the stream registry (requires custom setup)
var streamRegistry = agent.GetStreamRegistry(); // Custom extension

// When user clicks "Stop" button
stopButton.Click += (s, e) =>
{
    streamRegistry.InterruptAll();
};

await foreach (var evt in agent.RunAsync(messages))
{
    if (evt is IObservabilityEvent) continue;

    switch (evt)
    {
        case TextDeltaEvent delta:
            // These get dropped after interruption
            Console.Write(delta.Text);
            break;

        case EventDroppedEvent dropped:
            // Observability: track what was dropped
            Console.WriteLine($"\n[Dropped {dropped.Count} events]");
            break;

        case MessageTurnFinishedEvent:
            // This ALWAYS arrives, even after interruption
            Console.WriteLine("\n✓ Done");
            break;
    }
}

Selective Interruption

Interrupt specific streams:

csharp
// Interrupt streams matching a predicate
streamRegistry.InterruptWhere(stream =>
    stream.EmittedCount > 1000 || stream.Duration > TimeSpan.FromSeconds(30));

// Interrupt specific stream by ID
var stream = streamRegistry.Get("stream-123");
stream?.Interrupt();

Priority-Based Event Routing

For 99% of users: You don't need to set priority manually - the defaults work perfectly. This section explains why your stop button works instantly even during heavy streaming.

How Priority Channels Work

┌─────────────────────────────────────────────────────────────┐
│            BidirectionalEventCoordinator                     │
│                                                              │
│  Priority Channel (capacity: 64)                            │
│  ├── Immediate (0): User cancellation, emergency stops      │
│  └── Control (1): State changes, interruption acks          │
│                                                              │
│  Standard Channel (unbounded)                                │
│  ├── Normal (2): TextDelta, ToolResult (DEFAULT)            │
│  └── Background (3): Metrics, telemetry                      │
│                                                              │
│  Upstream Channel (capacity: 64)                             │
│  └── Interruption signals flowing back through middleware    │
│                                                              │
│  ReadAllAsync() ← Merges channels in priority order         │
└─────────────────────────────────────────────────────────────┘

Why Stop Buttons Feel Instant

  1. Agent emits 1000 TextDeltaEvent (Normal priority) → Standard channel
  2. User clicks "Stop" → InterruptionRequestEvent (Immediate priority) → Priority channel
  3. Coordinator reads Priority channel FIRST → Interruption processed immediately
  4. Remaining Normal events in Standard channel are dropped (if CanInterrupt=true)

Result: Stop button feels instant!

Priority Enum

csharp
public enum EventPriority
{
    Immediate = 0,  // User cancellation, emergency stops
    Control = 1,    // System control signals, interruption acks
    Normal = 2,     // Standard data flow (DEFAULT)
    Background = 3  // Metrics, telemetry, observability
}

Most events use Normal priority by default. The framework automatically assigns Immediate to interruption requests.

Event Stream Properties

All events inherit these streaming properties:

csharp
public abstract record AgentEvent
{
    // Stream control
    public EventPriority Priority { get; init; } = EventPriority.Normal;
    public string? StreamId { get; init; }
    public bool CanInterrupt { get; init; } = true;
    public EventDirection Direction { get; init; } = EventDirection.Downstream;
    public long SequenceNumber { get; internal set; }

    // Nested agent tracking
    public AgentExecutionContext? ExecutionContext { get; init; }
}

Event Direction

csharp
public enum EventDirection
{
    Downstream,  // Normal: input → processing → output
    Upstream     // Control: cancellation signals flowing back
}

Upstream events flow through middleware in reverse order, enabling middleware to respond to interruptions.

Stream Management Interfaces

Advanced Feature: These interfaces are for framework developers and advanced scenarios.

IStreamRegistry

csharp
public interface IStreamRegistry
{
    IStreamHandle Create(string? streamId = null);
    IStreamHandle? Get(string streamId);
    void InterruptAll();
    void InterruptWhere(Func<IStreamHandle, bool> predicate);

    IReadOnlyList<IStreamHandle> ActiveStreams { get; }
    int ActiveCount { get; }
}

IStreamHandle

csharp
public interface IStreamHandle
{
    string StreamId { get; }
    bool IsInterrupted { get; }
    bool IsCompleted { get; }
    int EmittedCount { get; }
    int DroppedCount { get; }

    void Interrupt();
    void Complete();
    Task WaitAsync(CancellationToken ct = default);

    event Action<IStreamHandle>? OnInterrupted;
    event Action<IStreamHandle>? OnCompleted;
}

Best Practices

Use CancellationToken for Most Scenarios

csharp
// Simple, reliable, works everywhere
using var cts = new CancellationTokenSource();
await foreach (var evt in agent.RunAsync(messages, cancellationToken: cts.Token))
{
    // ...
}

Always Handle OperationCanceledException

csharp
try
{
    await foreach (var evt in agent.RunAsync(messages, cancellationToken: ct))
    {
        // ...
    }
}
catch (OperationCanceledException)
{
    // Clean up, show cancellation message
    Console.WriteLine("\n⚠ Operation cancelled");
}

Combine CancellationToken Sources

csharp
// Multiple cancellation sources
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
using var userCts = new CancellationTokenSource();
using var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(
    timeoutCts.Token,
    userCts.Token);

await foreach (var evt in agent.RunAsync(messages, cancellationToken: combinedCts.Token))
{
    // Cancels on EITHER timeout OR user action
}

Don't Block the Event Stream

csharp
// WRONG: Blocks cancellation
await foreach (var evt in agent.RunAsync(messages, cancellationToken: ct))
{
    Thread.Sleep(1000); // DON'T DO THIS!
}

// CORRECT: Use async operations
await foreach (var evt in agent.RunAsync(messages, cancellationToken: ct))
{
    await Task.Delay(1000, ct); // Respects cancellation
}

Common Patterns

Pattern 1: Stop Button in UI

csharp
private CancellationTokenSource? _cts;

async Task StartAgentAsync()
{
    _cts = new CancellationTokenSource();

    try
    {
        await foreach (var evt in agent.RunAsync(messages, cancellationToken: _cts.Token))
        {
            // Handle events
        }
    }
    catch (OperationCanceledException)
    {
        // User stopped via button
    }
    finally
    {
        _cts?.Dispose();
        _cts = null;
    }
}

void OnStopButtonClick()
{
    _cts?.Cancel();
}

Pattern 2: Automatic Timeout

csharp
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromMinutes(5));

await foreach (var evt in agent.RunAsync(messages, cancellationToken: cts.Token))
{
    // Automatically stops after 5 minutes
}

Pattern 3: Progress-Based Cancellation

csharp
using var cts = new CancellationTokenSource();
var eventCount = 0;

await foreach (var evt in agent.RunAsync(messages, cancellationToken: cts.Token))
{
    eventCount++;

    // Cancel after 1000 events
    if (eventCount > 1000)
    {
        cts.Cancel();
    }
}

See Also

Released under the MIT License.